package com.persagy.iot.app

import com.persagy.iot.bean.IOTData
import org.apache.flink.api.common.functions.AggregateFunction

class IOTAgg() extends AggregateFunction[IOTData, IOTData, IOTData]{

  override def createAccumulator(): IOTData = null

  override def add(in: IOTData, acc: IOTData): IOTData = {

    if (acc == null){
      in
    } else {
      if (in.eventTime > acc.eventTime){
        acc.copy(eventTime = in.eventTime, sysTime = in.sysTime, value = in.value)
      } else {
        acc
      }
    }
  }

  override def getResult(acc: IOTData): IOTData = acc

  override def merge(acc: IOTData, acc1: IOTData): IOTData = {
    if (acc == null){
      acc1
    } else if (acc1 == null) {
      acc
    } else {
      if (acc.eventTime > acc1.eventTime){
        acc1.copy(eventTime = acc.eventTime, sysTime = acc.sysTime, value = acc.value)
      } else {
        acc.copy(eventTime = acc1.eventTime, sysTime = acc1.sysTime, value = acc1.value)
      }
    }
  }
}
